#include <sys/stat.h>
#include <stdarg.h>
#include "blockstore.h"
+#include <pthread.h>
#include "parallax-threaded.h"
#define BLOCKSTORE_REMOTE
-//#define BSDEBUG
+#define BSDEBUG
/*****************************************************************************
* Debugging
void DB(char *format, ...)
{
va_list args;
-
+ fprintf(stderr, "[%05u] ", (int)pthread_getspecific(tid_key));
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
#include <netinet/in.h>
#include <netdb.h>
-/*****************************************************************************
- * *
- *****************************************************************************/
-
/*****************************************************************************
* Network state *
*****************************************************************************/
/* Protects the queue manipulation critcal regions.
*/
-#define ENTER_QUEUE_CR (void)0
-#define LEAVE_QUEUE_CR (void)0
+pthread_mutex_t ptmutex_queue;
+#define ENTER_QUEUE_CR pthread_mutex_lock(&ptmutex_queue)
+#define LEAVE_QUEUE_CR pthread_mutex_unlock(&ptmutex_queue)
+
+pthread_mutex_t ptmutex_recv;
+#define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv)
+#define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv)
+
+int notify = 0;
+pthread_mutex_t ptmutex_notify;
+pthread_cond_t ptcv_notify;
+#define RECV_NOTIFY { \
+ pthread_mutex_lock(&ptmutex_notify); \
+ notify = 1; \
+ pthread_cond_signal(&ptcv_notify); \
+ pthread_mutex_unlock(&ptmutex_notify); }
+#define RECV_AWAIT { \
+ pthread_mutex_lock(&ptmutex_notify); \
+ if (notify) \
+ notify = 0; \
+ else \
+ pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \
+ pthread_mutex_unlock(&ptmutex_notify); }
+
/* A message queue entry. We allocate one of these for every request we send.
* Asynchronous reply reception also used one of these.
#define BSQ_STATUS_MATCHED 1
-#define ENTER_LUID_CR (void)0
-#define LEAVE_LUID_CR (void)0
+pthread_mutex_t ptmutex_luid;
+#define ENTER_LUID_CR pthread_mutex_lock(&ptmutex_luid)
+#define LEAVE_LUID_CR pthread_mutex_unlock(&ptmutex_luid)
static u64 luid_cnt = 0x1000ULL;
u64 new_luid(void) {
return q;
}
+/*****************************************************************************
+ * Network communication *
+ *****************************************************************************/
+
int send_message(bsq_t *qe) {
int rc;
bsq_t *recv_any(void) {
struct sockaddr_in from;
int rc;
-
+
DB("ENTER recv_any\n");
rx_qe.msghdr.msg_name = &from;
perror("recv_any");
return NULL;
}
+
rx_qe.length = rc;
rx_qe.server = get_server_number(&from);
return numreqs;
}
+ RECV_AWAIT;
+
+ /*
rxagain:
+ ENTER_RECV_CR;
q = recv_any();
+ LEAVE_RECV_CR;
if (!q)
return -1;
fprintf(stderr, "Unmatched RX\n");
goto rxagain;
}
+ */
goto checkmatch;
}
+/* receive loop
+ */
+void *receive_loop(void *arg)
+{
+ bsq_t *q, *m;
+
+ for(;;) {
+ q = recv_any();
+ if (!q) {
+ fprintf(stderr, "recv_any error\n");
+ }
+ else {
+ m = queuesearch(q);
+ recv_recycle_buffer(q);
+ if (!m) {
+ fprintf(stderr, "Unmatched RX\n");
+ }
+ else {
+ DB("RX MATCH");
+ RECV_NOTIFY;
+ }
+ }
+ }
+}
+pthread_t pthread_recv;
+
+/*****************************************************************************
+ * Reading *
+ *****************************************************************************/
+
void *readblock_indiv(int server, u64 id) {
void *block;
bsq_t *qe;
return block;
}
+/*****************************************************************************
+ * Writing *
+ *****************************************************************************/
+
bsq_t *writeblock_indiv(int server, u64 id, void *block) {
bsq_t *qe;
return -1;
}
+/*****************************************************************************
+ * Allocation *
+ *****************************************************************************/
+
/**
* allocblock: write a new block to disk
* @block: pointer to block
#else /* /BLOCKSTORE_REMOTE */
+/*****************************************************************************
+ * Local storage version *
+ *****************************************************************************/
/**
* readblock: read a block from disk
#endif /* BLOCKSTORE_REMOTE */
+/*****************************************************************************
+ * Memory management *
+ *****************************************************************************/
+
/**
* newblock: get a new in-memory block set to zeros
*
printf("Total of %Ld ids on freelist.\n", total);
}
+/*****************************************************************************
+ * Initialisation *
+ *****************************************************************************/
+
int __init_blockstore(void)
{
int i;
#ifdef BLOCKSTORE_REMOTE
struct hostent *addr;
+
+ pthread_mutex_init(&ptmutex_queue, NULL);
+ pthread_mutex_init(&ptmutex_luid, NULL);
+ pthread_mutex_init(&ptmutex_recv, NULL);
+ pthread_mutex_init(&ptmutex_notify, NULL);
+ pthread_cond_init(&ptcv_notify, NULL);
+
bsservers[0].hostname = "firebug.cl.cam.ac.uk";
bsservers[1].hostname = "planb.cl.cam.ac.uk";
bsservers[2].hostname = "simcity.cl.cam.ac.uk";
return -1;
}
+ pthread_create(&pthread_recv, NULL, receive_loop, NULL);
+
#else /* /BLOCKSTORE_REMOTE */
block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
#endif /* BLOCKSTORE_REMOTE */
return 0;
}
+
+void __exit_blockstore(void)
+{
+ pthread_mutex_destroy(&ptmutex_recv);
+ pthread_mutex_destroy(&ptmutex_luid);
+ pthread_mutex_destroy(&ptmutex_queue);
+ pthread_mutex_destroy(&ptmutex_notify);
+ pthread_cond_destroy(&ptcv_notify);
+}